[ 
https://issues.apache.org/jira/browse/FLINK-36650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jerome Gagnon updated FLINK-36650:
----------------------------------
    Description: 
When using the {{ConfluentRegistryAvroSerializationSchema}} as the 
serialization schema for a Kafka Sink it looks like the {{writeSchema}} 
function is always called on {{serialize}} : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]

which then call {{register}} on schema registry : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]

This is a problem for pre-registered type (ie; for generated classes in 
library) in which case we do not want the schema to be changed by application 
and that the schema validity is guaranteed by the provided classes.

>From a permission standpoint we want to restrict the schema registry API keys 
>to read-only permissions to avoid schema changes on write. Since the 
>{{register}} method is always called it still fails with the following event 
>if the Write operation ends up being a no-op.
{code:java}
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
User is denied operation Write on Subject: [REDACTED]; error code: 40301
 
{code}
When using standard Kafka producer we would set {{auto.register.schemas}} to 
{{false}} on the producer configuration but it seems like this property is not 
being respected in the format. The fix to this bug would involve using this 
configuration and instead of calling {{register}} it would read the {{schema}} 
to provide the registerId or use the one in {{SpecificRecordBase}} for 
{{AvroGenerated}}

  was:
When using the {{ConfluentRegistryAvroSerializationSchema}} as the 
serialization schema for a Kafka Sink it looks like the {{writeSchema}} 
function is always called on {{serialize}} : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]

which then call {{register}} on schema registry : 
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]

This is a problem for pre-registered type (ie; for generated classes in 
library) in which case we do not want the schema to be changed by application 
and that the schema validity is guaranteed by the provided classes.

>From a permission standpoint we want to restrict the schema registry API keys 
>to read-only permissions to avoid schema changes on write. Since the 
>{{register}} method is always called it still fails with the following event 
>if the Write operation ends up being a no-op.
{code:java}
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
User is denied operation Write on Subject: [REDACTED]; error code: 40301
 
{code}
When using standard Kafka producer we would set {{auto.register.schemas}} to 
{{false}} on the producer configuration but it seems like this property is not 
being respected in the format. The fix to this bug would involve using this 
configuration and instead of calling {{register}} it would read the {{schema}} 
to provide the registerId or use the one in {{SpecificRecordBase}} for 
{{AvroGenerated}} classes


> ConfluentRegistryAvroSerializationSchema always tries to register the schema 
> on serialize even if it already exists
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36650
>                 URL: https://issues.apache.org/jira/browse/FLINK-36650
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.19.1
>            Reporter: Jerome Gagnon
>            Priority: Minor
>
> When using the {{ConfluentRegistryAvroSerializationSchema}} as the 
> serialization schema for a Kafka Sink it looks like the {{writeSchema}} 
> function is always called on {{serialize}} : 
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
> which then call {{register}} on schema registry : 
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
> This is a problem for pre-registered type (ie; for generated classes in 
> library) in which case we do not want the schema to be changed by application 
> and that the schema validity is guaranteed by the provided classes.
> From a permission standpoint we want to restrict the schema registry API keys 
> to read-only permissions to avoid schema changes on write. Since the 
> {{register}} method is always called it still fails with the following event 
> if the Write operation ends up being a no-op.
> {code:java}
> Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> User is denied operation Write on Subject: [REDACTED]; error code: 40301
>  
> {code}
> When using standard Kafka producer we would set {{auto.register.schemas}} to 
> {{false}} on the producer configuration but it seems like this property is 
> not being respected in the format. The fix to this bug would involve using 
> this configuration and instead of calling {{register}} it would read the 
> {{schema}} to provide the registerId or use the one in {{SpecificRecordBase}} 
> for {{AvroGenerated}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to