[
https://issues.apache.org/jira/browse/FLINK-36650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jerome Gagnon updated FLINK-36650:
----------------------------------
Description:
When using the {{ConfluentRegistryAvroSerializationSchema}} as the
serialization schema for a Kafka Sink it looks like the {{writeSchema}}
function is always called on {{serialize}} :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
which then call {{register}} on schema registry :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
This is a problem for pre-registered type (ie; for generated classes in
library) in which case we do not want the schema to be changed by application
and that the schema validity is guaranteed by the provided classes.
>From a permission standpoint we want to restrict the schema registry API keys
>to read-only permissions to avoid schema changes on write. Since the
>{{register}} method is always called it still fails with the following event
>if the Write operation ends up being a no-op.
{code:java}
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
User is denied operation Write on Subject: [REDACTED]; error code: 40301
{code}
When using standard Kafka producer we would set {{auto.register.schemas}} to
{{false}} on the producer configuration but it seems like this property is not
being respected in the format. The fix to this bug would involve using this
configuration and instead of calling {{register}} it would read the {{schema}}
to provide the registerId or use the one in {{SpecificRecordBase}} for
{{AvroGenerated}}
was:
When using the {{ConfluentRegistryAvroSerializationSchema}} as the
serialization schema for a Kafka Sink it looks like the {{writeSchema}}
function is always called on {{serialize}} :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
which then call {{register}} on schema registry :
[https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
This is a problem for pre-registered type (ie; for generated classes in
library) in which case we do not want the schema to be changed by application
and that the schema validity is guaranteed by the provided classes.
>From a permission standpoint we want to restrict the schema registry API keys
>to read-only permissions to avoid schema changes on write. Since the
>{{register}} method is always called it still fails with the following event
>if the Write operation ends up being a no-op.
{code:java}
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
User is denied operation Write on Subject: [REDACTED]; error code: 40301
{code}
When using standard Kafka producer we would set {{auto.register.schemas}} to
{{false}} on the producer configuration but it seems like this property is not
being respected in the format. The fix to this bug would involve using this
configuration and instead of calling {{register}} it would read the {{schema}}
to provide the registerId or use the one in {{SpecificRecordBase}} for
{{AvroGenerated}} classes
> ConfluentRegistryAvroSerializationSchema always tries to register the schema
> on serialize even if it already exists
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36650
> URL: https://issues.apache.org/jira/browse/FLINK-36650
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.19.1
> Reporter: Jerome Gagnon
> Priority: Minor
>
> When using the {{ConfluentRegistryAvroSerializationSchema}} as the
> serialization schema for a Kafka Sink it looks like the {{writeSchema}}
> function is always called on {{serialize}} :
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L120]
> which then call {{register}} on schema registry :
> [https://github.com/apache/flink/blob/release-1.19.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85]
> This is a problem for pre-registered type (ie; for generated classes in
> library) in which case we do not want the schema to be changed by application
> and that the schema validity is guaranteed by the provided classes.
> From a permission standpoint we want to restrict the schema registry API keys
> to read-only permissions to avoid schema changes on write. Since the
> {{register}} method is always called it still fails with the following event
> if the Write operation ends up being a no-op.
> {code:java}
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> User is denied operation Write on Subject: [REDACTED]; error code: 40301
>
> {code}
> When using standard Kafka producer we would set {{auto.register.schemas}} to
> {{false}} on the producer configuration but it seems like this property is
> not being respected in the format. The fix to this bug would involve using
> this configuration and instead of calling {{register}} it would read the
> {{schema}} to provide the registerId or use the one in {{SpecificRecordBase}}
> for {{AvroGenerated}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)